其他
5分钟快速掌握 Python 定时任务框架
APScheduler 简介
schedule
:第三方模块,该模块适合比较轻量级的一些调度任务,但却不适用于复杂时间的调度APScheduler
:第三方定时任务框架,是对 Java 第三方定时任务框架Quartz
的模仿与移植,能提供比schedule
更复杂的应用场景,并且各种组件都是模块化,易于使用与二次开发。Celery Beat
:属于celery
这分布式任务队列第三方库下的一个定时任务组件,如果使用需要配合 RabbitMQ 或 Redis 这类的消息队列套件,需要花费一定的时间在环境搭建上,但在高版本中已经不支持 Windows。
APScheduler
来对我们的调度任务或定时任务进行管理是个性价比极高的选择。而本文主要会带你快速上手有关 APScheduler
的使用。APScheduler 概念与组件
APScheduler
之前,我们需要对这个框架的一些概念简单了解,主要有那么以下几个:触发器(trigger) 任务持久化(job stores) 执行器(executor) 调度器(scheduler)
触发器(trigger)
APScheduler
中主要是指时间触发器,并且主要有三类时间触发器可供使用:date
:日期触发器。日期触发器主要是在某一日期时间点上运行任务时调用,是APScheduler
里面最简单的一种触发器。所以通常也适用于一次性的任务或作业调度。interval
:间隔触发器。间隔触发器是在日期触发器基础上扩展了对时间部分,比如时、分、秒、天、周这几个部分的设定。是我们用以对重复性任务进行设定或调度的一个常用调度器。设定了时间部分之后,从起始日期开始(默认是当前)会按照设定的时间去执行任务。cron
:cron
表达式触发器。cron
表达式触发器就等价于我们 Linux 上的 crontab,它主要用于更复杂的日期时间进行设定。但需要注意的是,APScheduler
不支持 6 位及以上的 cron 表达式,最多只支持到 5 位。
任务持久化(job stores)
APScheduler
就会根据对存储好的调度任务结果进行判断,如果出现已经过期但未执行的情况会进行相应的操作。APScheduler
为我们提供了多种持久化任务的途径,默认是使用 memory
也就是内存的形式,但内存并不是持久化最好的方式。最好的方式则是通过像数据库这样的载体来将我们的定时任务写入到磁盘当中,只要磁盘没有损坏就能将数据给恢复。APScheduler
支持的且常用的数据库主要有:sqlalchemy
形式的数据库,这里就主要是指各种传统的关系型数据库,如 MySQL、PostgreSQL、SQLite 等。mongodb
非结构化的 Mongodb 数据库,该类型数据库经常用于对非结构化或版结构化数据的存储或操作,如 JSON。redis
内存数据库,通常用作数据缓存来使用,当然通过一些主从复制等方式也能实现当中数据的持久化或保存。
Scheduler
实例时创建,或是单独为任务指定。配置的方式相对简单,我们只需要指定对应的数据库链接即可。执行器(executor)
APScheduler
里的执行器通常就是 ThreadPoolExecutor
或 ProcessPoolExecutor
这样的线程池和进程池两种。AsyncIOExecutor
、TwistedExecutor
和 GeventExecutor
三种执行器。调度器(scheduler)
APScheduler
的用途。根据用途的不同,APScheduler
又提供了以下几种调度器:BlockingScheduler
:阻塞调度器,当程序中没有任何存在主进程之中运行东西时,就则使用该调度器。BackgroundScheduler
:后台调度器,在不使用后面任何的调度器且希望在应用程序内部运行时的后台启动时才进行使用,如当前你已经开启了一个 Django 或 Flask 服务。AsyncIOScheduler
:AsyncIO
调度器,如果代码是通过asyncio
模块进行异步操作,使用该调度器。GeventScheduler
:Gevent
调度器,如果代码是通过gevent
模块进行协程操作,使用该调度器TornadoScheduler
:Tornado
调度器,在Tornado
框架中使用TwistedScheduler
:Twisted
调度器,在基于Twisted
的框架或应用程序中使用QtScheduler
:Qt
调度器,在构建Qt
应用中进行使用。
BlockingScheduler
调度器来进行操作,它会在当前进程中启动相应的线程来进行任务调度与处理;反之,如果是和 Web 项目或应用共存,那么需要选择 BackgroundScheduler
调度器,因为它不会干扰当前应用的线程或进程状况。APScheduler
的运行流程:设定调度器(scheduler)用以对任务的调度与安排进行全局统筹 对相应的函数或方法上设定相应的触发器(trigger),并添加到调度器中 如有任务持久化(job stores)需要则需要设定对应的持久化层,否则默认使用内存存储任务 当触发器被触发时,就将任务交由执行器(executor)进行执行
APScheduler 快速上手
APScheduler
里面的概念和组件看起来有点多,但在使用上并不算很复杂,我们可以通过本节的示例就能够很快使用。选择对应的 scheduler
scheduler
对象,所有的 scheduler
对象都被放在了 apscheduler.schedulers
模块下,我们可以直接通过查看 API 文档或者借助 IDE 补全的提示来获取相应的 scheduler
对象。BlockingScheduler
:# main.py
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
配置 scheduler
scheduler
的一些配置我们可以直接在实例化对象时就进行配置,当然也可以在创建实例化对象之后再进行配置。# main.py
from datetime import datetime
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
# 任务持久化 使用 SQLite
jobstores = {
'default': SQLAlchemyJobStore(url = 'sqlite:///jobs.db')
}
# 执行器配置
executors = {
'default': ThreadPoolExecutor(20),
}
# 关于 Job 的相关配置,见官方文档 API
job_defaults = {
'coalesce': False,
'next_run_time': datetime.now()
}
scheduler = BlockingScheduler(
jobstores = jobstores,
executors = executors,
job_defaults = job_defaults,
timezone = 'Asia/Shanghai'
)
scheduler.configure
方法进行同样的操作:scheduler = BlockingScheduler()
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone='Asia/Shanghai')
添加并执行你的任务
scheduler
对象之后,我们需要调用其下的 add_job()
或是 scheduled_job()
方法来将我们需要执行的函数进行注册。前者是以传参的形式指定对应的函数名,而后者则是以装饰器的形式直接对我们要执行的函数进行修饰。now()
:from datetime import datetime
def now(trigger):
print(f"trigger:{trigger} -> {datetime.now()}")
add_job()
可以这样写:if __name__ == '__main__':
scheduler.add_job(now, trigger = "interval", args = ("interval",), seconds = 5)
scheduler.start()
start()
方法之后调度器就会开始执行,并在控制台上看到对应的结果了:trigger:interval -> 2021-01-16 21:19:43.356674
trigger:interval -> 2021-01-16 21:19:46.679849
trigger:interval -> 2021-01-16 21:19:48.356595
@scheduled_job
的方式来装饰我们的任务或许会更加自由一些,于是上面的例子就可以写成这样:@scheduler.scheduled_job(trigger = "interval", args = ("interval",), seconds = 5)
def now(trigger):
print(f"trigger:{trigger} -> {datetime.now()}")
if __name__ == '__main__':
scheduler.start()
start()
方法执行前调用,否则会找不到任务或是抛出异常。将 APScheduler 集成到 Web 项目中
APScheduler
由于多样的调度器,我们能够将其和我们的项目结合到一起。Flask
,那么 Flask-APScheduler
这一别人写好的第三方包装库就很适合你,虽然它没有相关的文档,但只要你了解了前面我所介绍的有关于 APScheduler
的概念和组件,你就能很轻易地看懂这个第三方库仓库里的示例代码。APScheduler
本身也提供了一些对任务或作业的增删改查操作,我们可以自己编写一套合适的 API。temp-scheduler
├── config.py # 配置项
├── main.py # API 文件
└── scheduler.py # APScheduler 相关设置
安装依赖
这里我们需要的依赖不多,只需要简单几个即可:
pip install fastapi apscheduler sqlalchemy uvicorn
配置项
config.py
我们主要像 Flask 的配置那样简单设定:from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
class SchedulerConfig:
JOBSTORES = {"default": SQLAlchemyJobStore(url="sqlite:///job.db")}
EXECUTORS = {"default": ThreadPoolExecutor(20)}
JOB_DEFAULTS = {"coalesce": False}
@classmethod
def to_dict(cls):
return {
"jobstores": cls.JOBSTORES,
"executors": cls.EXECUTORS,
"job_defaults": cls.JOB_DEFAULTS,
}
SchedulerConfig
配置项中我们可以自己实现一个 to_dict()
类方法,以便我们后续传参时通过解包的方式直接传入配置参数即可。Scheduler 相关设置
scheduler.py
模块的设定也比较简单,即设定对应的 scheduler
调度器即可。由于是演示 demo 我还将要定期执行的任务也放在了这个模块当中:import logging
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from config import SchedulerConfig
scheduler = BackgroundScheduler()
logger = logging.getLogger(__name__)
def init_scheduler() -> None:
# config scheduler
scheduler.configure(**SchedulerConfig.to_dict())
logger.info("scheduler is running...")
# schedule test
scheduler.add_job(
func=mytask,
trigger="date",
args=("APScheduler Initialize.",),
next_run_time=datetime.now(),
)
scheduler.start()
def mytask(message: str) -> None:
print(f"[{datetime.now()}] message: {message}")
init_scheduler()
方法主要用于在 API 服务启动时被调用,然后对scheduler
对象的配置以及测试mytask()
则是我们要定期执行的任务,后续我们可以通过 APScheduler 提供的方法来自行添加任务
API 设置
main.py
模块就主要存放着我们由 FastAPI 所构建的相关 API。如果在后续开发时存在多个接口,此时就需要将不同接口放在不同模块文件中,以达到路由的分发与管理,类似于 Flask 的蓝图模式。import logging
import uuid
from datetime import datetime
from typing import Any, Dict, Optional, Sequence, Union
from fastapi import FastAPI
from pydantic import BaseModel
from scheduler import init_scheduler, mytask, scheduler
logger = logging.getLogger(__name__)
app = FastAPI(title="APScheduler API")
app.add_event_handler("startup", init_scheduler)
class Job(BaseModel):
id: Union[int, str, uuid.UUID]
name: Optional[str] = None
func: Optional[str] = None
args: Optional[Sequence[Optional[str]]] = None
kwargs: Optional[Dict[str, Any]] = None
executor: Optional[str] = None
misfire_grace_time: Optional[str] = None
coalesce: Optional[bool] = None
max_instances: Optional[int] = None
next_run_time: Optional[Union[str, datetime]] = None
@app.post("/add")
def add_job(
message: str,
trigger: str,
trigger_args: Optional[dict],
id: Union[str, int, uuid.UUID],
):
try:
scheduler.add_job(
func=mytask,
trigger=trigger,
kwargs={"message": message},
id=id,
**trigger_args,
)
except Exception as e:
logger.exception(e.args)
return {"status_code": 0, "message": "添加失败"}
return {"status_code": 1, "message": "添加成功"}
@app.delete("/delete/{id}")
def delete_job(id: Union[str, int, uuid.UUID]):
"""delete exist job by id"""
try:
scheduler.remove_job(job_id=id)
except Exception:
return dict(
message="删除失败",
status_code=0,
)
return dict(
message="删除成功",
status_code=1,
)
@app.put("/reschedule/{id}")
def reschedule_job(
id: Union[str, int, uuid.UUID], trigger: str, trigger_args: Optional[dict]
):
try:
scheduler.reschedule_job(job_id=id, trigger=trigger, **trigger_args)
except Exception as e:
logger.exception(e.args)
return dict(
message="修改失败",
status_code=0,
)
return dict(
message="修改成功",
status_code=1,
)
@app.get("/job")
def get_all_jobs():
jobs = None
try:
job_list = scheduler.get_jobs()
if job_list:
jobs = [Job(**task.__getstate__()) for task in job_list]
except Exception as e:
logger.exception(e.args)
return dict(
message="查询失败",
status_code=0,
jobs=jobs,
)
return dict(
message="查询成功",
status_code=1,
jobs=jobs,
)
@app.get("/job/{id}")
def get_job_by_id(id: Union[int, str, uuid.UUID]):
jobs = []
try:
job = scheduler.get_job(job_id=id)
if job:
jobs = [Job(**job.__getstate__())]
except Exception as e:
logger.exception(e.args)
return dict(
message="查询失败",
status_code=0,
jobs=jobs,
)
return dict(
message="查询成功",
status_code=1,
jobs=jobs,
)
FastAPI 对象 app
的初始化。这里用到的add_event_handler()
方法就有点像 Flask 中的before_first_request
,会在 Web 服务请求伊始进行操作,理解为初始化相关的操作即可。API 接口路由。路由通过 app
对象下的对应 HTTP 方法来实现,如GET
、POST
、PUT
等。这里的装饰器用法其实也和 Flask 很类似,就不多赘述。scheduler
对象的增删改查。从scheduler.py
模块中引入我们创建好的scheduler
对象之后就可以直接用来做增删改查的操作:增:使用 add_job()
方法,其主要的参数是要运行的函数(或方法)、触发器以及触发器参数等删:使用 delete_job()
方法,我们需要传入一个对应任务的id
参数,用以能够查找到对应的任务改:使用 reschedule_job()
方法,这里也需要一个对应任务的id
参数,以及需要重新修改的触发器及其参数查:使用 get_jobs()
和get_job()
两个方法,前者是直接获取到当前调度的所有任务,返回的是一个包含了APScheduler.job.Job
对象的列表,而后者是通过id
参数来查找对应的任务对象;这里我通过底层源码使用__getstate__()
来获取到任务的相关信息,这些信息我们通过事先设定好的Job
对象来对其进行序列化,最后将信息从接口中返回。
运行
uvicorn main:app
http://127.0.0.1:8000/docs
中看到关于全部接口的 Swagger 文档页面了:结尾
APScheduler
框架的概念及其用法,并进行了简单的实践。APScheduler
的模块化设计才可以让我们更方便地去理解、使用它,并将其运用到我们实际的开发过程中。作者:100gle,练习时长不到两年的非正经文科生一枚,喜欢敲代码、写写文章、捣鼓捣鼓各种新事物;现从事有关大数据分析与挖掘的相关工作。
赞 赏 作 者
更多阅读
特别推荐
点击下方阅读原文加入社区会员